// Copyright (c) 2015, Baidu.com, Inc. All Rights Reserved
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//

#include "io/tablet_scanner.h"

#include <limits>

#include <gflags/gflags.h>

#include "io/tablet_io.h"
#include "proto/status_code.pb.h"
#include "util/coding.h"

DECLARE_int32(tera_tabletnode_scanner_cache_size);

namespace tera {
namespace io {

ScanContextManager::ScanContextManager() {
  cache_ = leveldb::NewLRUCache(FLAGS_tera_tabletnode_scanner_cache_size);
}
// when tabletio unload, because scan_context->m_it has reference of version,
// so we shoud drop all cache it
ScanContextManager::~ScanContextManager() {
  MutexLock l(&lock_);
  delete cache_;
}

// access in lock_ context
static void LRUCacheDeleter(const ::leveldb::Slice& key, void* value) {
  ScanContext* context = reinterpret_cast<ScanContext*>(value);
  VLOG(10) << "evict from cache, " << context->session_id;
  CHECK(context->handle == NULL);
  if (context->it) {
    delete context->it;
  }
  if (context->compact_strategy) {
    delete context->compact_strategy;
  }
  delete context;
  return;
}

ScanContext* ScanContextManager::GetScanContext(TabletIO* tablet_io,
                                                const ScanTabletRequest* request,
                                                ScanTabletResponse* response,
                                                google::protobuf::Closure* done) {
  ScanContext* context = NULL;
  ::leveldb::Cache::Handle* handle = NULL;

  // init common param of response
  VLOG(10) << "push task for session id: " << request->session_id()
           << ", sequence id: " << request->sequence_id();
  response->set_results_id(std::numeric_limits<unsigned long>::max());
  response->set_complete(false);
  response->set_status(kTabletNodeOk);

  // search from cache
  MutexLock l(&lock_);
  char buf[sizeof(int64_t)];
  ::leveldb::EncodeFixed64(buf, request->session_id());
  ::leveldb::Slice key(buf, sizeof(buf));
  handle = cache_->Lookup(key);
  if (handle) {
    // not first session rpc, no need init scan context
    context = reinterpret_cast<ScanContext*>(cache_->Value(handle));
    context->jobs.push(ScanJob(response, done));
    if (context->jobs.size() > 1) {
      cache_->Release(handle);
      VLOG(10) << "push task into queue, " << request->session_id();
      return NULL;
    }
    CHECK(context->handle == NULL);
    context->handle = handle;  // first one refer item in cache
    return context;
  }

  // case 1: if this session's first request not arrive, drop this one
  // case 2: client RPCtimeout resend
  if (request->part_of_session()) {
    VLOG(10) << "drop invalid request " << request->sequence_id() << ", session_id "
             << request->session_id();
    done->Run();
    return NULL;
  }

  // first rpc new scan context
  context = new ScanContext;
  context->session_id = request->session_id();
  context->tablet_io = tablet_io;

  context->it = nullptr;
  context->compact_strategy = nullptr;
  context->ret_code = kTabletNodeOk;
  context->result = nullptr;
  context->data_idx = 0;
  context->complete = false;
  context->version_num = 1;

  handle = cache_->Insert(key, context, 1, &LRUCacheDeleter);
  context->jobs.push(ScanJob(response, done));
  context->handle = handle;  // refer item in cache
  // init context other param in TabletIO context
  return context;
}

// check event bit, then schedule context
bool ScanContextManager::ScheduleScanContext(ScanContext* context) {
  while (context->ret_code == kTabletNodeOk) {
    ScanTabletResponse* response;
    ::google::protobuf::Closure* done;
    {
      MutexLock l(&lock_);
      response = context->jobs.front().first;
      done = context->jobs.front().second;
    }
    context->result = response->mutable_results();

    context->tablet_io->ProcessScan(context);

    // reply to client
    response->set_complete(context->complete);
    response->set_status(context->ret_code);
    response->set_results_id(context->data_idx);
    response->set_data_size(context->data_size);
    response->set_row_count(context->row_count);
    response->set_cell_count(context->cell_count);
    (context->data_idx)++;
    context->result = NULL;
    done->Run();  // TODO: try async return, time consume need test

    {
      MutexLock l(&lock_);
      context->jobs.pop();

      // complete or io error, return all the rest request to client
      if (context->complete || (context->ret_code != kTabletNodeOk)) {
        DeleteScanContext(context);  // never use context
        if (context->ret_code != kTabletNodeOk) {
          return false;
        }
        return true;
      }
      if (context->jobs.size() == 0) {
        ::leveldb::Cache::Handle* handle = context->handle;
        context->handle = NULL;
        cache_->Release(handle);  // unrefer cache item
        return true;
      }
    }
  }
  {
    MutexLock l(&lock_);
    if (context->ret_code != kTabletNodeOk) {
      DeleteScanContext(context);  // never use context
      return false;
    }
  }
  return true;
}

// access in lock_ context
void ScanContextManager::DeleteScanContext(ScanContext* context) {
  uint32_t job_size = context->jobs.size();
  while (job_size) {
    ScanTabletResponse* response = context->jobs.front().first;
    ::google::protobuf::Closure* done = context->jobs.front().second;
    response->set_complete(context->complete);
    response->set_status(context->ret_code);
    done->Run();

    context->jobs.pop();
    job_size--;
  }

  int64_t session_id = context->session_id;
  VLOG(10) << "scan " << session_id << ", complete " << context->complete << ", ret "
           << StatusCode_Name(context->ret_code);
  ::leveldb::Cache::Handle* handle = context->handle;
  context->handle = NULL;
  cache_->Release(handle);  // unrefer cache item, no more use context!!!

  char buf[sizeof(int64_t)];
  ::leveldb::EncodeFixed64(buf, session_id);
  ::leveldb::Slice key(buf, sizeof(buf));
  cache_->Erase(key);
}

}  // namespace io
}  //  namespace tera
